--- title: Challenge 1: dolphin instance segmentation keywords: fastai sidebar: home_sidebar summary: "The goal of this challenge is to find all instances of dolphins in a picture and then color pixes of each dolphin with a unique color." description: "The goal of this challenge is to find all instances of dolphins in a picture and then color pixes of each dolphin with a unique color." nb_path: "notebooks/01_Dolphin_instance_segmentation_challenge.ipynb" ---
import pandas as pd
import seaborn as sns
We start by downloading and visualizing the dataset containing 200 photographs with one or more dolphins split into a training set containing 160 photographs and a validation set containing 40 photographs.
from dolphins_recognition_challenge.datasets import get_dataset, display_batches
data_loader, data_loader_test = get_dataset("segmentation", batch_size=3)
display_batches(data_loader, n_batches=2, width=600)
In order to prevent overfitting which happens when the dataset size is too small, we perform a number of transformations to increase the size of the dataset. One transofrmation implemented in the Torch vision library is RandomHorizontalFlip and we will implemented MyColorJitter which is basically just a wrapper around torchvision.transforms.ColorJitter class. However, we cannot use this class directly without a wrapper because a transofrmation could possibly affect targets and not just the image. For example, if we were to implement RandomCrop, we would need to crop segmentation masks and readjust bounding boxes as well.
def _flip_coco_person_keypoints(kps, width):
flip_inds = [0, 2, 1, 4, 3, 6, 5, 8, 7, 10, 9, 12, 11, 14, 13, 16, 15]
flipped_data = kps[:, flip_inds]
flipped_data[..., 0] = width - flipped_data[..., 0]
# Maintain COCO convention that if visibility == 0, then x, y = 0
inds = flipped_data[..., 2] == 0
flipped_data[inds] = 0
return flipped_data
class Compose(object):
def __init__(self, transforms):
self.transforms = transforms
def __call__(self, image, target):
for t in self.transforms:
image, target = t(image, target)
return image, target
class RandomHorizontalFlip(object):
def __init__(self, prob):
self.prob = prob
def __call__(self, image, target):
if random.random() < self.prob:
height, width = image.shape[-2:]
image = image.flip(-1)
bbox = target["boxes"]
bbox[:, [0, 2]] = width - bbox[:, [2, 0]]
target["boxes"] = bbox
if "masks" in target:
target["masks"] = target["masks"].flip(-1)
if "keypoints" in target:
keypoints = target["keypoints"]
keypoints = _flip_coco_person_keypoints(keypoints, width)
target["keypoints"] = keypoints
return image, target
class ToTensor(object):
def __call__(self, image, target):
image = F.to_tensor(image)
return image, target
class MyColorJitter:
def __init__(self, brightness=0.5, contrast=0.5, saturation=0.5, hue=0.5):
self.torch_color_jitter = torchvision.transforms.ColorJitter(
brightness=brightness, contrast=contrast, saturation=saturation, hue=hue
)
def __call__(self, image, target):
image = self.torch_color_jitter(image)
return image, target
We will make a series of transformations on an image and we will combine all those transofrmations in a single one as follows:
def get_tensor_transforms(train):
transforms = []
# converts the image, a PIL image, into a PyTorch Tensor
transforms.append(ToTensor())
if train:
# during training, randomly flip the training images
# and ground-truth for data augmentation
transforms.append(
MyColorJitter(brightness=0.5, contrast=0.5, saturation=0.5, hue=0.5)
)
transforms.append(RandomHorizontalFlip(0.5))
# TODO: add additional transforms: e.g. random crop
return Compose(transforms)
data_loader, data_loader_test = get_dataset("segmentation", batch_size=2, get_tensor_transforms=get_tensor_transforms)
display_batches(data_loader, n_batches=2, width=800)
With data augementation defined, we are ready to generate the actual datasets used for training our models.
batch_size = 4
data_loader, data_loader_test = get_dataset(
"segmentation", get_tensor_transforms=get_tensor_transforms, batch_size=batch_size
)
display_batches(data_loader, n_batches=4, width=800)
{% include tip.html content='incorporate more transformation classes such as RandomCrop etc. (https://pytorch.org/docs/stable/torchvision/transforms.html)' %}
def get_instance_segmentation_model(hidden_layer_size):
# our dataset has two classes only - background and dolphin
num_classes = 2
# load an instance segmentation model pre-trained on COCO
model = torchvision.models.detection.maskrcnn_resnet50_fpn(
pretrained=True
) # box_score_thresh=0.5
# get the number of input features for the classifier
in_features = model.roi_heads.box_predictor.cls_score.in_features
# replace the pre-trained head with a new one
model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)
# now get the number of input features for the mask classifier
in_features_mask = model.roi_heads.mask_predictor.conv5_mask.in_channels
model.roi_heads.mask_predictor = MaskRCNNPredictor(
in_channels=in_features_mask,
dim_reduced=hidden_layer_size,
num_classes=num_classes
)
return model
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
# get the model using our helper function
model = get_instance_segmentation_model(hidden_layer_size=256)
# move model to the right device
model.to(device)
# construct an optimizer
params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(params, lr=0.005, momentum=0.9, weight_decay=0.0005)
# and a learning rate scheduler which decreases the learning rate by
# 10x every 3 epochs
lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.1)
import utils
import math
def train_one_epoch(model, optimizer, data_loader, device, epoch, print_freq):
model.train()
metric_logger = utils.MetricLogger(delimiter=" ")
metric_logger.add_meter('lr', utils.SmoothedValue(window_size=1, fmt='{value:.6f}'))
header = 'Epoch: [{}]'.format(epoch)
lr_scheduler = None
if epoch == 0:
warmup_factor = 1. / 1000
warmup_iters = min(1000, len(data_loader) - 1)
lr_scheduler = utils.warmup_lr_scheduler(optimizer, warmup_iters, warmup_factor)
for images, targets in metric_logger.log_every(data_loader, print_freq, header):
images = list(image.to(device) for image in images)
targets = [{k: v.to(device) for k, v in t.items()} for t in targets]
loss_dict = model(images, targets)
losses = sum(loss for loss in loss_dict.values())
# reduce losses over all GPUs for logging purposes
loss_dict_reduced = utils.reduce_dict(loss_dict)
losses_reduced = sum(loss for loss in loss_dict_reduced.values())
loss_value = losses_reduced.item()
if not math.isfinite(loss_value):
print("Loss is {}, stopping training".format(loss_value))
print(loss_dict_reduced)
sys.exit(1)
optimizer.zero_grad()
losses.backward()
optimizer.step()
if lr_scheduler is not None:
lr_scheduler.step()
metric_logger.update(loss=losses_reduced, **loss_dict_reduced)
metric_logger.update(lr=optimizer.param_groups[0]["lr"])
return metric_logger
# let's train it for 20 epochs
num_epochs = 20
print("Training...")
for epoch in range(num_epochs):
# train for one epoch, printing every 10 iterations
train_one_epoch(model, optimizer, data_loader, device, epoch, print_freq=10)
# update the learning rate
lr_scheduler.step()
# evaluate on the test dataset
# evaluate(model, data_loader_test, device=device)
evaluate(model, data_loader_test, device=device)
# pick one image from the test set
img, _ = data_loader_test.dataset[0]
# put the model in evaluation mode
model.eval()
with torch.no_grad():
prediction = model([img.to(device)])
prediction
from dolphins_recognition_challenge.datasets import stack_imgs
def show_pred(dl, n=None, score_limit=0.5, width=600):
dataset_test = dl.dataset
if n == None:
n = len(dataset_test)
for i in range(n):
img = dataset_test[i][0]
img_bg = Image.fromarray(img.mul(255).permute(1, 2, 0).byte().numpy())
images = [img_bg]
model.eval()
with torch.no_grad():
prediction = model([img.to(device)])
predicted_masks = prediction[0]["masks"]
scores = prediction[0]["scores"]
for i in range(predicted_masks.shape[0]):
score = scores[i]
if score >= score_limit:
bg = img_bg.copy()
fg = Image.fromarray(predicted_masks[i, 0].mul(255).byte().cpu().numpy())
bg.paste(fg.convert("RGB"), (0, 0), fg)
images.append(bg)
display(stack_imgs(images, width))
show_pred(data_loader_test, score_limit=0.5, width=1200)
def iou_metric(
binary_segmentation: np.array,
binary_gt_label: np.array,
) -> float:
"""
Compute the IOU between two binary segmentation (typically one ground truth and a predicted one).
Input:
binary_segmentation: binary 2D numpy array representing the region of interest as segmented by the algorithm
binary_gt_label: binary 2D numpy array representing the region of interest as provided in the database
Output:
IOU: IOU between the segmentation and the ground truth
"""
assert binary_segmentation.dtype in [np.int, np.int8, np.int16, np.int32, np.bool]
assert binary_gt_label.dtype in [np.int, np.int8, np.int16, np.int32, np.bool]
assert len(binary_segmentation.shape) == 2
assert len(binary_gt_label.shape) == 2
# turn all variables to booleans, just in case
binary_segmentation = np.asarray(binary_segmentation, dtype=np.bool)
binary_gt_label = np.asarray(binary_gt_label, dtype=np.bool)
# compute the intersection
intersection = np.logical_and(binary_segmentation, binary_gt_label)
union = np.logical_or(binary_segmentation, binary_gt_label)
# count the number of True pixels in the binary segmentation
segmentation_pixels = float(np.sum(binary_segmentation.flatten()))
# same for the ground truth
gt_label_pixels = float(np.sum(binary_gt_label.flatten()))
# same for the intersection and union
intersection = float(np.sum(intersection.flatten()))
union = float(np.sum(union.flatten()))
# compute the Dice coefficient
smooth = 0.001
IOU = (intersection + smooth) / (union + smooth)
return IOU
from torchvision.transforms import ToPILImage
def get_true_and_predicted_masks(
model: torchvision.models.detection.mask_rcnn.MaskRCNN,
example: Tuple[torch.Tensor, Dict[str, torch.Tensor]],
score_limit: float = 0.5,
) -> Tuple[PIL.Image.Image, Dict[str, np.array]]:
""" Returns a PIL image and dictionary containing both true and predicted masks as numpy arrays.
"""
img = example[0]
true_masks = (
example[1]["masks"].mul(255).cpu().numpy().astype(np.int8)
)
model.eval()
with torch.no_grad():
predictions = model([img.to(device)])
pred_scores = predictions[0]["scores"].cpu().numpy()
pred_masks = predictions[0]["masks"].squeeze(1).mul(255).cpu().numpy().astype(np.int8)
pred_masks = np.squeeze(pred_masks[np.argwhere(pred_scores > score_limit), :, :], 1)
return ToPILImage()(img), {"true": true_masks, "predicted": pred_masks}
img, masks = get_true_and_predicted_masks(model, data_loader_test.dataset[0], 0.5)
print(f'We have {masks["true"].shape[0]} dolphins on the photo, total of {masks["predicted"].shape[0]} are predicted with score higher than 0.5')
display(img.resize((600, 450)))
metrics = np.array([
[
iou_metric(
binary_segmentation=pred_masks[j, :, :],
binary_gt_label=true_masks[i, :, :],
)
for i in range(true_masks.shape[0])
]
for j in range(pred_masks.shape[0])
])
cm = sns.light_palette("green", as_cmap=True)
df = pd.DataFrame(metrics)
df.style.background_gradient(cmap=cm)
type(data_loader_test.dataset[0][1]["masks"])
type(model)
def drop_max_row_and_column(df: pd.DataFrame) -> Tuple[float, pd.DataFrame]:
df = df.copy().reset_index(drop=True).T.reset_index(drop=True).T
n_col = df.shape[1]
ij = df.values.argmax()
i = ij // n_col
j = ij % n_col
max_df = df.loc[i, j]
rest_df = df.drop(index=i).drop(columns=j)
return max_df, rest_df
def my_way(df) -> List[float]:
if len(df.shape) < 0 or df.shape[0] == 0 or df.shape[1] == 0:
return []
max_df, rest_df = drop_max_row_and_column(df)
return [max_df] + my_way(rest_df)
my_way(df)